{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Put Customer Reviews On Kinesis Data Firehose"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis-complete.png\" width=\"90%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "import json\n",
    "\n",
    "sess = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "sm = boto3.Session().client(service_name=\"sagemaker\", region_name=region)\n",
    "firehose = boto3.Session().client(service_name=\"firehose\", region_name=region)\n",
    "kinesis_analytics = boto3.Session().client(service_name=\"kinesisanalytics\", region_name=region)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r firehose_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    firehose_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(firehose_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r firehose_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    firehose_arn\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(firehose_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_kinesis_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    iam_role_kinesis_arn\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(iam_role_kinesis_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r kinesis_data_analytics_app_name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    kinesis_data_analytics_app_name\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(kinesis_data_analytics_app_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r lambda_fn_name_cloudwatch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    lambda_fn_name_cloudwatch\n",
    "except NameError:\n",
    "    print(\"+++++++++++++++++++++++++++++++\")\n",
    "    print(\"[ERROR] Please run the notebooks in this section before you continue.\")\n",
    "    print(\"+++++++++++++++++++++++++++++++\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(lambda_fn_name_cloudwatch)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "firehoses = firehose.list_delivery_streams(DeliveryStreamType=\"DirectPut\")\n",
    "\n",
    "print(json.dumps(firehoses, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 cp 's3://dsoaws/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import csv\n",
    "import pandas as pd\n",
    "\n",
    "df = pd.read_csv(\n",
    "    \"./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz\",\n",
    "    delimiter=\"\\t\",\n",
    "    quoting=csv.QUOTE_NONE,\n",
    "    compression=\"gzip\",\n",
    ")\n",
    "df.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body = df[[\"review_id\", \"star_rating\", \"product_category\", \"review_body\"]][0:1]\n",
    "\n",
    "df_star_rating_and_review_body.to_csv(sep=\"\\t\", header=None, index=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Check that Kinesis Data Analytics Application Is Running"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "import time\n",
    "\n",
    "app_status = response[\"ApplicationDetail\"][\"ApplicationStatus\"]\n",
    "\n",
    "while app_status != \"RUNNING\":\n",
    "    time.sleep(5)\n",
    "    response = kinesis_analytics.describe_application(ApplicationName=kinesis_data_analytics_app_name)\n",
    "    app_status = response[\"ApplicationDetail\"][\"ApplicationStatus\"]\n",
    "    print(\"Application status {}\".format(app_status))\n",
    "\n",
    "print(\"Application status {}\".format(app_status))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Wait For The Application Status ^^ Running ^^_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Simulate Producer Application Writing Records to the Stream"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Open Lambda (CloudWatch) Logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=%252Faws%252Flambda%252F{}\">Lambda Logs</a></b>'.format(\n",
    "            region, lambda_fn_name_cloudwatch\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Keep That ^^ Link ^^ Open In Your Browser_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Open Custom CloudWatch Metrics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        \"\"\"<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#metricsV2:graph=~(metrics~(~(~'kinesis*2fanalytics*2fAVGStarRating~'AVGStarRating~'Product*20Category~'All))~view~'timeSeries~stacked~false~start~'-PT5M~end~'P0D~region~'us-east-1~liveData~true~stat~'Average~period~1~title~'Avg*20Star*20Rating);query=~'*7bkinesis*2fanalytics*2fAVGStarRating*2c*22Product*20Category*22*7d\">CloudWatch Metrics</a></b>\"\"\".format(\n",
    "            region, region\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Keep That ^^ Link ^^ Open In Your Browser_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Open Kinesis Data Analytics Console UI"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}\"> Kinesis Data Analytics App</a></b>'.format(\n",
    "            region, kinesis_data_analytics_app_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# _Keep That ^^ Link ^^ Open In Your Browser To See Live Records Coming In After Running Next Cells_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Put Records onto Firehose"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "firehose_response = firehose.describe_delivery_stream(DeliveryStreamName=firehose_name)\n",
    "\n",
    "print(json.dumps(firehose_response, indent=4, sort_keys=True, default=str))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "step = 1\n",
    "\n",
    "for start_idx in range(0, 500, step):\n",
    "    end_idx = start_idx + step\n",
    "\n",
    "    df_star_rating_and_review_body = df[[\"review_id\", \"product_category\", \"review_body\"]][start_idx:end_idx]\n",
    "\n",
    "    reviews_tsv = df_star_rating_and_review_body.to_csv(sep=\"\\t\", header=None, index=False)\n",
    "\n",
    "    # print(reviews_tsv.encode('utf-8'))\n",
    "\n",
    "    response = firehose.put_record(Record={\"Data\": reviews_tsv.encode(\"utf-8\")}, DeliveryStreamName=firehose_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}\"> Kinesis Data Analytics App</a></b>'.format(\n",
    "            region, kinesis_data_analytics_app_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Go To Kinesis Analytics UI: \n",
    "\n",
    "# _Note: If You See This Error `No rows in source stream`:_\n",
    "\n",
    "<img src=\"img/no_rows_in_source_kinesis_firehose_stream.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## _Click On `Source` Or `Real-Time analytics` Tab Or Re-Run ^^ Above ^^ Cell `Put Records onto Firehose`_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review S3 Kinesis Source Records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/s3/buckets/{}/kinesis-data-firehose-source-record/?region={}&tab=overview\"> S3 Source Records</a></b>'.format(\n",
    "            bucket, region\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "They should look like this: \n",
    "    \n",
    "```\n",
    "R2EI7QLPK4LF7U\tDigital_Software\tSo far so good\n",
    "R1W5OMFK1Q3I3O\tDigital_Software\tNeeds a little more work.....\n",
    "RPZWSYWRP92GI\t Digital_Software\tPlease cancel.\n",
    "R2WQWM04XHD9US\tDigital_Software\tWorks as Expected!\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Review S3 Kinesis Transformed Records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Review <a target=\"top\" href=\"https://console.aws.amazon.com/s3/buckets/{}/kinesis-data-firehose/?region={}&tab=overview\"> S3 Transformed Records</a></b>'.format(\n",
    "            bucket, region\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "They should look like this:\n",
    "    \n",
    "```\n",
    "R2EI7QLPK4LF7U\t5\tDigital_Software\tSo far so good\n",
    "R1W5OMFK1Q3I3O\t3\tDigital_Software\tNeeds a little more work.....\n",
    "RPZWSYWRP92GI\t 1\tDigital_Software\tPlease cancel.\n",
    "R2WQWM04XHD9US\t5\tDigital_Software\tWorks as Expected!\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Go To Kinesis Analytics UI: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Go To UI <a target=\"top\" href=\"https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}\"> Kinesis Data Analytics App</a></b>'.format(\n",
    "            region, kinesis_data_analytics_app_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ---- You can see our reviews streaming data coming in under the `Source` tab:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis_analytics_1.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ---- Go to `Real-time analytics` tab and select `AVG_STAR_RATING_SQL_STREAM`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis_analytics_5.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ------ Go to `Real-time analytics` tab and select `APPROXIMATE_COUNT_SQL_STREAM`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis_analytics_4.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Go To Kinesis Analytics UI and Check Anomaly Detection Score"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Go To <a target=\"top\" href=\"https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}\"> Kinesis Data Analytics App</a></b>'.format(\n",
    "            region, kinesis_data_analytics_app_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create and Put Anomaly Data Onto Stream\n",
    "Here, we are hard-coding a bad review to trigger an anomaly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "\n",
    "import time\n",
    "\n",
    "anomaly_step = 1\n",
    "\n",
    "for start_idx in range(0, 10000, anomaly_step):\n",
    "    timestamp = int(time.time())\n",
    "\n",
    "    df_anomalies = pd.DataFrame(\n",
    "        [\n",
    "            {\n",
    "                \"review_id\": str(timestamp),\n",
    "                \"product_category\": \"Digital_Software\",\n",
    "                \"review_body\": \"This is an awful waste of time.\",\n",
    "            },\n",
    "        ],\n",
    "        columns=[\"review_id\", \"star_rating\", \"product_category\", \"review_body\"],\n",
    "    )\n",
    "\n",
    "    reviews_tsv_anomalies = df_anomalies.to_csv(sep=\"\\t\", header=None, index=False)\n",
    "\n",
    "    response = firehose.put_record(\n",
    "        Record={\"Data\": reviews_tsv_anomalies.encode(\"utf-8\")}, DeliveryStreamName=firehose_name\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "display(\n",
    "    HTML(\n",
    "        '<b>Go To <a target=\"top\" href=\"https://console.aws.amazon.com/kinesisanalytics/home?region={}#/wizard/editor?applicationName={}\"> Kinesis Data Analytics App</a></b>'.format(\n",
    "            region, kinesis_data_analytics_app_name\n",
    "        )\n",
    "    )\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# ---- Go to `Real-time analytics` tab and select `ANOMALY_SCORE_SQL_STREAM`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"img/kinesis_analytics_3.png\" width=\"80%\" align=\"left\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Release Resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# %%html\n",
    "\n",
    "# <p><b>Shutting down your kernel for this notebook to release resources.</b></p>\n",
    "# <button class=\"sm-command-button\" data-commandlinker-command=\"kernelmenu:shutdown\" style=\"display:none;\">Shutdown Kernel</button>\n",
    "\n",
    "# <script>\n",
    "# try {\n",
    "#     els = document.getElementsByClassName(\"sm-command-button\");\n",
    "#     els[0].click();\n",
    "# }\n",
    "# catch(err) {\n",
    "#     // NoOp\n",
    "# }\n",
    "# </script>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# %%javascript\n",
    "\n",
    "# try {\n",
    "#     Jupyter.notebook.save_checkpoint();\n",
    "#     Jupyter.notebook.session.delete();\n",
    "# }\n",
    "# catch(err) {\n",
    "#     // NoOp\n",
    "# }"
   ]
  }
 ],
 "metadata": {
  "instance_type": "ml.t3.medium",
  "kernelspec": {
   "display_name": "Python 3 (Data Science)",
   "language": "python",
   "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
